package com.atguigu;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCDC_SQL {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2.使用DDL方式创建表
        tableEnv.executeSql("" +
                "CREATE TABLE base_trademark ( " +
                "id INT, " +
                "tm_name STRING, " +
                "aaa STRING, " +
                "primary key(id) not enforced " +
                ") WITH (" +
                "'connector' = 'mysql-cdc'," +
                "'hostname' = '192.168.9.102'," +
                "'port' = '3306'," +
                "'username' = 'root'," +
                "'password' = '000000'," +
                "'database-name' = 'gmall-210927-flink'," +
                "'table-name' = 'base_trademark'" +
                ")");

        //TODO 3.打印数据
        tableEnv.sqlQuery("select * from base_trademark")
                .execute()
                .print();

        //TODO 4.启动任务
        env.execute();

    }

}
